package com.atguigu.gmall.realtime.dwd.db.app;

import com.atguigu.gmall.realtime.common.base.BaseSQLApp;
import com.atguigu.gmall.realtime.common.constant.Constant;
import com.atguigu.gmall.realtime.common.util.SQLUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import java.time.Duration;

/**
 * @Author atguigu
 * @Date 2023/7/7 14:16
 */
public class Dwd_08_DwdTradeRefundPaySuccess extends BaseSQLApp {
    public static void main(String[] args) {
        new Dwd_08_DwdTradeRefundPaySuccess().start(
            30008,
            2,
            "Dwd_08_DwdTradeRefundPaySuccess"
        );
        
    }
    
    @Override
    public void handle(StreamExecutionEnvironment env,
                       StreamTableEnvironment tEnv) {
        tEnv.getConfig().setIdleStateRetention(Duration.ofSeconds(5));
        
        // 1. 读取 ods_db
        readOdsDb(tEnv);
        // 2. 读取 字典表
        readBaseDic(tEnv);
        // 3. 过滤退款成功表数据
        Table refundPayment = tEnv.sqlQuery(
            "select " +
                "data['id'] id," +
                "data['order_id'] order_id," +
                "data['sku_id'] sku_id," +
                "data['payment_type'] payment_type," +
                "data['callback_time'] callback_time," +
                "data['total_amount'] total_amount," +
                "pt, " +
                "ts " +
                "from ods_db " +
                "where `database`='gmall2023' " +
                "and `table`='refund_payment' " +
                "and `type`='update' " +
                "and `old`['refund_status'] is not null " +
                "and `data`['refund_status']='1602'");
        tEnv.createTemporaryView("refund_payment", refundPayment);
        
        // 4. 过滤退单表中的退单成功的数据
        Table orderRefundInfo = tEnv.sqlQuery(
            "select " +
                "data['order_id'] order_id," +
                "data['sku_id'] sku_id," +
                "data['refund_num'] refund_num " +
                "from ods_db " +
                "where `database`='gmall2023' " +
                "and `table`='order_refund_info' " +
                "and `type`='update' " +
                "and `old`['refund_status'] is not null " +
                "and `data`['refund_status']='0705'");
        tEnv.createTemporaryView("order_refund_info", orderRefundInfo);
        
        // 5. 过滤订单表中的退款成功的数据
        Table orderInfo = tEnv.sqlQuery(
            "select " +
                "data['id'] id," +
                "data['user_id'] user_id," +
                "data['province_id'] province_id " +
                "from ods_db " +
                "where `database`='gmall2023' " +
                "and `table`='order_info' " +
                "and `type`='update' " +
                "and `old`['order_status'] is not null " +
                "and `data`['order_status']='1006'");
        tEnv.createTemporaryView("order_info", orderInfo);
        
        // 6. 4 张表的 join
        Table result = tEnv.sqlQuery(
            "select " +
                "rp.id," +
                "oi.user_id," +
                "rp.order_id," +
                "rp.sku_id," +
                "oi.province_id," +
                "rp.payment_type," +
                "dic.info.dic_name payment_type_name," +
                "date_format(rp.callback_time,'yyyy-MM-dd') date_id," +
                "rp.callback_time," +
                "ori.refund_num," +
                "rp.total_amount," +
                "rp.ts " +
                "from refund_payment rp " +
                "join order_refund_info ori " +
                "on rp.order_id=ori.order_id and rp.sku_id=ori.sku_id " +
                "join order_info oi " +
                "on rp.order_id=oi.id " +
                "join base_dic for system_time as of rp.pt as dic " +
                "on rp.payment_type=dic.dic_code ");
       
        // 7.写出到 kafka
        tEnv.executeSql("create table dwd_trade_refund_payment_success(" +
                            "id string," +
                            "user_id string," +
                            "order_id string," +
                            "sku_id string," +
                            "province_id string," +
                            "payment_type_code string," +
                            "payment_type_name string," +
                            "date_id string," +
                            "callback_time string," +
                            "refund_num string," +
                            "refund_amount string," +
                            "ts bigint " +
                            ")" + SQLUtil.getKafkaDDLSink(Constant.TOPIC_DWD_TRADE_REFUND_PAYMENT_SUCCESS));
        result.executeInsert("dwd_trade_refund_payment_success");
        
        
    }
}
/*
交易域退款成功事务事实表
业务过程: 商家打款
粒度:   商品
表:
    refund_payment update 1601=>1602
        join
    order_refund_info  update   0705
        join
    order_info     update  1005=>1006
    
    字典表
    
    ttl: 5s
    
    
    





*/